Skip to content

Conversation

ChrisWrenDev
Copy link
Contributor

Issue: #347

This PR refactors how producers send messages to reduce the size of generated async futures and to fix unnecessary lock contention during batching.

The public API and behavior are unchanged — these are internal improvements.

Motivation
Oversized async futures (~16 KB):

  • Previously, Producer::send_non_blocking and related methods held on to the network send future returned by connection.sender().send(..) across an .await.
  • That network future captures a large amount of state internally, which forced the compiler to generate async state machines that were ~16 KB in size.
  • This bloated types like SendFuture and Result<SendFuture, Error> and propagated large futures into downstream code.

Unnecessary batch lock contention:

  • In batching mode, the code serialized every message while still holding the batch mutex.
  • Serialization can involve allocations and compression, so other tasks trying to enqueue messages were blocked on the lock for longer than necessary.

Changes
Introduced send_inner_retry / start_send_once helpers:

  • The network send future is created synchronously and immediately wrapped into a much smaller future.
  • No large future is ever held across an .await.
  • The retry loop logic for reconnects and IO errors is preserved exactly as before.

Refactored batching (send_batch and batching branch of send_raw):

  • Messages are drained from the queue under the lock, then the lock is released.
  • Only after releasing the lock are the messages serialized and compressed.
  • This ensures lock contention is minimal, while maintaining the same batching and receipt fan-out behavior.

Benefits

  • Futures are small and cheap: SendFuture itself remains just 8 bytes, and callers of send_non_blocking no longer cause giant state machines to be generated.
  • Better concurrency: The batch mutex is only held for lightweight queue operations. Other producers can continue pushing messages while serialization happens.
  • Behavior preserved: Compression, retries, batching, and public APIs are unchanged.

Testing

  • Verified with -Zprint-type-sizes that the generated futures are now small (hundreds of bytes instead of ~16 KB).
  • Verified against Risingwave locally that clippy::large_future errors are resolved.

@ChrisWrenDev
Copy link
Contributor Author

All of the changes in send_non_blocking could alternatively be simplified by boxing the network future. That would avoid the refactor but introduce a heap allocation on every send.

I kept the current approach to eliminate the allocation and keep futures lightweight, but I’m open to reverting to the boxed-future solution if reviewers prefer the safer path. Given that Producer currently lacks direct tests, it’s harder to guarantee no behavioral regressions from the refactor.

@ChrisWrenDev
Copy link
Contributor Author

@freeznet tagging you for review. Thanks!

@BewareMyPower
Copy link
Contributor

but introduce a heap allocation on every send.

IMO, heap allocation is acceptable.

BTW, could you use another PR to improve the batch lock contention? It seems to be a separated issue with the large future size.

src/producer.rs Outdated
Comment on lines 795 to 792
let f = async move {
let res = fut.await;
res.map_err(|e| {
error!("wait send receipt got error: {:?}", e);
Error::Producer(ProducerError::Connection(e))
})
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary to reduce the future size? After putting ensure_connected().await before this method, the connection send future won't be held across an .await.

BTW, could you share more details about the outputs of build with the -Zprint-type-sizes flag? i.e. which future's type size is large, as well as the size after the change? It would be helpful to verify if any regression could be introduced in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By doing ensure_connected().await before creating the send future, then creating the send future in a sync function start_send_once and returning a thin adapter (e.g., Ok(async move { fut.await.map_err(..) })), the outer async fn no longer holds the big future and now only returns a small wrapper.

I will rerun -Zprint-type-sizes on the current branch and on main to regenerate a before/after comparison to illustrate the changes.

@ChrisWrenDev ChrisWrenDev force-pushed the fix/producer-small-futures branch from ebb57e6 to 14f8ac2 Compare August 27, 2025 15:36
@ChrisWrenDev ChrisWrenDev force-pushed the fix/producer-small-futures branch from c149c34 to 3268d17 Compare August 28, 2025 15:46
@ChrisWrenDev
Copy link
Contributor Author

After investigating with -Zprint-type-sizes, I couldn’t observe meaningful type size differences directly in the pulsar-rs crate, but in RisingWave I was able to see sizes. Interestingly, the reported sizes were under Clippy’s large_futures threshold of 16 384 bytes, and my earlier changes actually made the future slightly larger—even though they silenced the Clippy warnings.

This seems to be due to the way Clippy estimates future sizes, which can yield false positives. (See discussion in risingwavelabs/risingwave#22971)

After stripping back my earlier refactor, I found that the only change needed to eliminate the downstream Clippy large_futures warnings in RisingWave was making send_compress synchronous. Since compression is CPU-bound and doesn’t benefit from async, this removes an unnecessary state machine in the call chain. While not required for actual size reduction, it silences Clippy downstream and makes the code simpler.

Before:

type: `{async fn body of sink::pulsar::PulsarPayloadWriter<'_>::send_message()}`: 16184 bytes, alignment: 8 bytes
print-type-size         local `.__awaitee`: 15656 bytes, alignment: 8 bytes, 
type: {async fn body of pulsar::Producer<pulsar::TokioExecutor>::send_non_blocking<pulsar::producer::Message>()}

After:

type: `{async fn body of sink::pulsar::PulsarPayloadWriter<'_>::send_message()}`: 16728 bytes, alignment: 8 bytes
local `.__awaitee`: 16200 bytes, alignment: 8 bytes, type: {async fn body of pulsar::Producer<pulsar::TokioExecutor>::send_non_blocking<pulsar::producer::Message>()}

Compression Only:

type: `{async fn body of sink::pulsar::PulsarPayloadWriter<'_>::send_message()}`: 16056 bytes, alignment: 8 bytes
local `.__awaitee`: 15528 bytes, alignment: 8 bytes, type: {async fn body of pulsar::Producer<pulsar::TokioExecutor>::send_non_blocking<pulsar::producer::Message>()}

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened a PR to support batch timeout: #354, this PR abstracts a synchronous compress_message function as well.

It involves much code refactoring so that it will have conflicts with this PR, I'd like to hold this PR for a while. Feel free to review my PR as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants